#!/usr/bin/perl
use File::Basename;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use POSIX;
use Time::HiRes qw(usleep nanosleep);
use strict;

my (@HOST_ARRAY,$HOST_FILE,$TEST_TYPE,$DURATION,$SUBTASK,$BASE_TIME,$IS_HELP,$VERSION);
my ($LOCAL_HOSTNAME);
my $MAIN_PID = substr("000000".$$,-6);
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $CMD_PATH = dirname(__FILE__);
my @NEED_COMMAND = ("scp","ssh","perl",'ps',"netserver","netperf");
my (%HOST_TIME_BASE,@RESULT_ARRAY);

my $HELP_MESSAGE = qq#COMMAND NAME: $CMD_NAME
Check a Greenplum database network performance.

Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn

************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME [-h hostname [-h hostname ...]]
    [-f hostname file]
    [-t test type]
    [--duration second]
    [--help]
    [--version]
*****************************************************
DESCRIPTION
*****************************************************
The $CMD_NAME utility is used to check the Greenplum cluster network performance.
When you start $CMD_NAME,the utility will run a matrix netperf test between all hosts.

Before run the check, you should make sure all the hosts have complete ssh key exchange.
*****************************************************
OPTIONS
*****************************************************

-h <hostname>

  A hostname or IP address, which will be used to matrix netperf test.
  This option can be specified multiple times to include multiple hosts.

  eg.
  -h mdw001 -h mdw002

-f <hostname file>

  The location and name of file containing list of hostname for execute matrix netperf.
  Every host per line.
  Utility will ignore any line start with #.'#'.q# or blank line.

-t <test type>

  Which mode will test, current only support:
  TCP_STREAM and UDP_STREAM, default is TCP_STREAM.

  eg.
  -t UDP_STREAM

--duration second

-h|--help

  Displays the online help.

--version

  Displays the command version.

Examples:
$CMD_NAME -f allhosts --duration 60
$CMD_NAME -f allhosts --duration 60 -t TCP_STREAM
$CMD_NAME --help
#;

sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub encode{
    my ($string) = @_;
    my $encode = encode_base64($string);
    $encode =~ s/\n//g;
    return $encode;
}
sub decode{
    return decode_base64($_[0]);
}
sub round{
    my ($float,$precision) = @_;
    my ($integer,$decimal) = split(/\./,$float);
    if($precision eq "" || $precision == 0){
        return $float;
    }
    return $integer.".".substr($decimal.("0" x $precision),0,$precision);
}
sub logMessage{
    my ($message) = @_;
    print trim($message)."\n";
}
sub errorMessage{
    my ($message) = @_;
    logMessage($message);
    exit 1;
}
sub executeCommand{
    my ($command) = @_;
    my $output = readpipe($command);
    my $exit_code = $? >> 8;
    $output = trim($output);
    return ($output,$exit_code);
}

sub getOption{
    GetOptions(
        'h:s'         => \@HOST_ARRAY,
        'f:s'         => \$HOST_FILE,
        't:s'         => \$TEST_TYPE,
        'duration:i'  => \$DURATION,
        'subtask:s'   => \$SUBTASK,
        'basetime:f'  => \$BASE_TIME,
        'help!'       => \$IS_HELP,
        'version!'    => \$VERSION,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exit 0;
    }
    if($VERSION){
        print "$CMD_NAME 0.1\n";
        exit 0;
    }
}
sub checkEnvironment{
    my $whichCode;
    my @problemList = ();
    for my $cmd(@NEED_COMMAND){
        system("which $cmd > /dev/null 2>&1");
        $whichCode = $? >> 8;
        if($whichCode != 0){
            push @problemList,$cmd;
        }
    }
    if(@problemList > 0){
        errorMessage("Environment not enough to continue install, command ".join(',',@problemList)." not found in current environment");
    }
}
sub checkOption{
    my %host_hash = ();
    if($SUBTASK eq ""){
        for my $hostname(@HOST_ARRAY){
            $host_hash{$hostname} = "";
        }
        if("" ne $HOST_FILE){
            if(!open(FILE_HANDLE,"<",$HOST_FILE)){
                errorMessage("Can't open file: $HOST_FILE");
            }
            while(my $hostname = <FILE_HANDLE>){
                $hostname = trim($hostname);
                if($hostname =~ /^#/ || $hostname eq ""){
                    next;
                }
                $host_hash{$hostname} = "";
            }
        }
    }else{
        $LOCAL_HOSTNAME = $HOST_ARRAY[0];
        for my $hostname(split(/\./,$SUBTASK)){
            $hostname = decode($hostname);
            if($hostname ne $LOCAL_HOSTNAME){
                $host_hash{$hostname} = "";
            }
        }
        $LOCAL_HOSTNAME = decode($LOCAL_HOSTNAME);
    }
    @HOST_ARRAY = keys %host_hash;
    if(($SUBTASK eq "" && @HOST_ARRAY < 2) || ($SUBTASK ne "" && @HOST_ARRAY < 1)){
        errorMessage("Hostname number is not enough for netcheck");
    }
    if("" eq $TEST_TYPE){
        $TEST_TYPE = "TCP_STREAM";
    }
    if($TEST_TYPE ne "TCP_STREAM" && $TEST_TYPE ne "UDP_STREAM"){
        errorMessage("Please specify -t with value TCP_STREAM or UDP_STREAM");
    }
    if($DURATION eq ""){
        $DURATION = 30;
    }
}
sub mainThreadCheck{
    my ($hostname,$base_time) = @_;
    my $command = qq{ssh $hostname -T 'date +%s.%N'};
    my ($host_time,$code) = executeCommand($command);
    my $end_time = trim(readpipe("date +%s.%N"));
    my $time_spen = $end_time - $base_time;
    return ($code,$hostname,$host_time,$time_spen);
}
sub mainCheckDelay{
    my $base_time = trim(readpipe("date +%s.%N"));
    my @thread_array = ();
    for my $hostname(@HOST_ARRAY){
        my ($task_thread) = threads->new(\&mainThreadCheck,$hostname,$base_time);
        push @thread_array,$task_thread;
    }
    my $have_problem = 0;
    my $max_spen = 0.0;
    for my $thread(@thread_array){
        my ($code,$hostname,$host_time,$time_spen) = $thread->join;
        if($code != 0){
            $have_problem = 1;
            logMessage($host_time);
        }else{
            if($time_spen > $max_spen){
                $max_spen = $time_spen;
            }
            $HOST_TIME_BASE{$hostname} = $host_time - $time_spen;
        }
    }
    if($have_problem){
        exit 1;
    }
    if($max_spen < 10.0){
        $max_spen += 10.0;
    }elsif($max_spen > 30.0){
        $max_spen += 20.0;
    }else{
        $max_spen +=  ($max_spen + 10.0) * 0.5;
    }
    for my $hostname(keys %HOST_TIME_BASE){
        $HOST_TIME_BASE{$hostname} = $HOST_TIME_BASE{$hostname} + $max_spen;
    }
    return $max_spen;
}
sub mainThreadTask{
    my ($hostname,$max_delay,$base_time,$task_string) = @_;
    my $timeout = int($DURATION + $max_delay) + 6;
    if($TEST_TYPE eq "UDP_STREAM"){
        $timeout *= 2;
    }
    executeCommand(qq{scp $CMD_PATH/$CMD_NAME $hostname:/tmp/});
    my $hostname_encode = encode($hostname);
    my $command = qq{timeout -s 9 $timeout ssh $hostname -T 'perl /tmp/$CMD_NAME -h $hostname_encode --basetime $base_time -t $TEST_TYPE --duration $DURATION --subtask $task_string'};
    my ($output,$code) = executeCommand($command);
    return ($output,$code);
}
sub mainKillAll{
    for my $hostname(@HOST_ARRAY){
        my $kill_bash = q#ps ax|egrep 'netperf|netserver'|egrep -vw 'grep|egrep'|awk '{system("kill -9 "$1)}'#;
        my $command = qq{ssh $hostname -T <<'END_OF_SSH'\n}.$kill_bash."\nEND_OF_SSH";
        executeCommand($command);
    }
}
sub startMainTask{
    my ($max_delay) = @_;
    mainKillAll();
    my @thread_array = ();
    my @temp_host_array = ();
    for my $hostname(@HOST_ARRAY){
        push @temp_host_array,encode($hostname);
    }
    my $all_host_string = join(".",@temp_host_array);
    for my $hostname(@HOST_ARRAY){
        my ($task_thread) = threads->new(\&mainThreadTask,$hostname,$max_delay,$HOST_TIME_BASE{$hostname},$all_host_string);
        push @thread_array,$task_thread;
    }
    my $success = 1;
    for my $thread(@thread_array){
        my ($output,$code) = $thread->join;
        if($code != 0){
            $success = 0;
            mainKillAll();
            logMessage($output);
        }else{
            for my $line(split("\n",trim($output))){
                my ($host_from,$host_to,$ratio) = split(/\./,$line);
                ($host_from,$host_to,$ratio) = (decode($host_from),decode($host_to),decode($ratio));
                push @RESULT_ARRAY,[($host_from,$host_to,$ratio)];
            }
        }
    }
    mainKillAll();
    return $success;
}
sub subThreadTask{
    my ($hostname) = @_;
    my $timeout = $DURATION + 3;
    if($TEST_TYPE eq "UDP_STREAM"){
        $timeout *= 2;
    }
    my $command = qq{set -o pipefail;timeout -s 9 $timeout netperf -H $hostname -l $DURATION -p 23000 -f M -4 -P 0 -t $TEST_TYPE};
    $command = $command.qq{|head -n 1};
    my ($output,$code) = executeCommand($command);
    return ($hostname,trim($output),$code);
}
sub subKillAll{
    my $kill_bash = q#ps ax|egrep 'netperf|netserver'|egrep -vw 'grep|egrep'|awk '{system("kill -9 "$1)}'#;
    executeCommand($kill_bash);
}
sub startSubTask{
    my $command = qq{netserver -p 23000 -4 > /dev/null 2>&1};
    my ($output,$code) = executeCommand($command);
    my $check = qq{ps ax|grep netserver|grep -vw grep|tee};
    ($output,$code) = executeCommand($check);
    if($output eq ""){
        logMessage("Host $LOCAL_HOSTNAME start netserver failed");
        exit 2;
    }
    my $current_time = trim(readpipe("date +%s.%N"));
    my $need_wait = $BASE_TIME - $current_time;
    if($need_wait < 0){
        subKillAll();
        logMessage("Waiting time before current time");
        exit 1;
    }
    usleep($need_wait * 1000000);
    my @thread_array = ();
    for my $hostname(@HOST_ARRAY){
        if($hostname ne $LOCAL_HOSTNAME){
            my ($task_thread) = threads->new(\&subThreadTask,$hostname);
            push @thread_array,$task_thread;
        }
    }
    for my $thread(@thread_array){
        my ($hostname,$output,$code) = $thread->join;
        if($code != 0){
            if($TEST_TYPE eq "UDP_STREAM"){
                next;
            }
            subKillAll();
            logMessage($output);
            sleep 1;
            exit 1;
        }else{
            push @RESULT_ARRAY,[($hostname,$output)];
        }
    }
    my $local_hostname_encode = encode($LOCAL_HOSTNAME);
    for my $row(@RESULT_ARRAY){
        my ($hostname,$output) = @$row;
        my $ratio = (split(/\s+/,$output))[-1];
        print $local_hostname_encode.".".encode($hostname).".".encode($ratio)."\n";
    }
}
sub outputResult{
    my %send_ratio = ();
    my %recieve_ratio = ();
    my $max_name_length = 0;
    my $max_ratio_length = 0;
    my ($max_from_host,$max_to_host,$max_ratio,$min_from_host,$min_to_host,$min_ratio);
    for my $row(@RESULT_ARRAY){
        my ($host_from,$host_to,$ratio) = @$row;
        if(length($host_from) > $max_name_length){
            $max_name_length = length($host_from);
        }
        if(length($host_to) > $max_name_length){
            $max_name_length = length($host_to);
        }
        if($max_ratio eq "" || $ratio > $max_ratio){
            ($max_from_host,$max_to_host,$max_ratio) = ($host_from,$host_to,$ratio);
        }
        if($min_ratio eq "" || $min_ratio > $ratio){
            ($min_from_host,$min_to_host,$min_ratio) = ($host_from,$host_to,$ratio);
        }
        if(!exists $send_ratio{$host_from}){
            $send_ratio{$host_from} = 0.0;
        }
        $send_ratio{$host_from} = $send_ratio{$host_from} + $ratio;
        if(!exists $recieve_ratio{$host_to}){
            $recieve_ratio{$host_to} = 0.0;
        }
        $recieve_ratio{$host_to} = $recieve_ratio{$host_to} + $ratio;
        my $max_sum = $send_ratio{$host_from} > $recieve_ratio{$host_to} ? $send_ratio{$host_from} : $recieve_ratio{$host_to};
        if(length(round($max_sum,2)) > $max_ratio_length){
            $max_ratio_length = length(round($max_sum,2));
        }
    }
    my $host_pad = " " x $max_name_length;
    my $ratio_pad = " " x $max_ratio_length;
    print "Run time duration : $DURATION S\n";
    print "Test name: $TEST_TYPE\n";
    print "Max host to host ratio is:\n";
    print "    ".substr($max_from_host.$host_pad,0,$max_name_length)."  =>  ".substr($max_to_host.$host_pad,0,$max_name_length)."  :  ".substr($ratio_pad.$max_ratio,0 - $max_ratio_length)." MB\n";
    print "Min host to host ratio is:\n";
    print "    ".substr($min_from_host.$host_pad,0,$max_name_length)."  =>  ".substr($min_to_host.$host_pad,0,$max_name_length)."  :  ".substr($ratio_pad.$min_ratio,0 - $max_ratio_length)." MB\n";
    print "\n";
    print "Host Send Ratio:\n";
    for my $hostname(sort(keys %send_ratio)){
        my $ratio = round($send_ratio{$hostname},2);
        print "    ".substr($hostname.$host_pad,0,$max_name_length)."  :  ".substr($ratio_pad.$ratio,0 - $max_ratio_length)." MB\n";
    }
    print "\n";
    print "Host Recieve Ratio:\n";
    for my $hostname(sort(keys %recieve_ratio)){
        my $ratio = round($recieve_ratio{$hostname},2);
        print "    ".substr($hostname.$host_pad,0,$max_name_length)."  :  ".substr($ratio_pad.$ratio,0 - $max_ratio_length)." MB\n";
    }
}
sub main{
    getOption();
    checkEnvironment();
    checkOption();
    if($SUBTASK eq ""){
        my $max_delay = mainCheckDelay();
        my $success = startMainTask($max_delay);
        if($success){
            outputResult();
        }
    }else{
        startSubTask();
    }
}
my $command_string = $0." ".join(" ",@ARGV);
$| = 1;
main($command_string);
